package com.atguigu.gmall.realtime.app.dwd.db;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.BaseDbTableProcessFunction;
import com.atguigu.gmall.realtime.beans.BaseDbTableProcess;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
 * @author Felix
 * @date 2023/11/14
 * 事实表动态分流
 * 需要启动的进程
 *      zk、kafka、maxwell、BaseDbApp
 */
public class BaseDbApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //TODO 2.检查点相关的设置(略)
        //TODO 3.从kafka的主题中读取数据
        //3.1 声明消费的主题以及消费者组
        String topic = "topic_db";
        String groupId = "base_db_group";
        //3.2 创建消费者对象
        KafkaSource<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 消费数据  封装为流
        DataStreamSource<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");

        //TODO 4.对流中数据类型进行转换    并简单的ETL
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.process(
            new ProcessFunction<String, JSONObject>() {
                @Override
                public void processElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
                    try {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        String type = jsonObj.getString("type");
                        if (!type.startsWith("bootstrap-")) {
                            out.collect(jsonObj);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        );
        // jsonObjDS.print(">>>>");
        //TODO 5.使用FlinkCDC读取配置表数据
        //5.1 创建MySqlSource数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("hadoop102")
            .port(3306)
            .databaseList("gmall0529_config")
            .tableList("gmall0529_config.table_process_dwd")
            .username("root")
            .password("123456")
            .startupOptions(StartupOptions.initial())
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();

        //5.2 读取数据 封装为流
        DataStreamSource<String> mysqlStrDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");
        // mysqlStrDS.print(">>>>");

        //TODO 6.将配置流进行广播
        MapStateDescriptor<String, BaseDbTableProcess> mapStateDescriptor
            = new MapStateDescriptor<String, BaseDbTableProcess>("mapStateDescriptor",String.class,BaseDbTableProcess.class);
        BroadcastStream<String> broadcastDS = mysqlStrDS.broadcast(mapStateDescriptor);

        //TODO 7.将主流和广播流进行关联
        BroadcastConnectedStream<JSONObject, String> connectDS = jsonObjDS.connect(broadcastDS);

        //TODO 8.对关联之后的数据进行处理
        SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
            new BaseDbTableProcessFunction(mapStateDescriptor)
        );
        //TODO 9.将处理逻辑比较简单的事实表数据发送到kafka的不同的主题中
        realDS.print(">>>>");
        realDS
            .sinkTo(MyKafkaUtil.getKafkaSinkBySchema(new KafkaRecordSerializationSchema<JSONObject>() {
                @Nullable
                @Override
                public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, KafkaSinkContext context, Long timestamp) {
                    String topic = jsonObj.getString("sink_table");
                    jsonObj.remove("sink_table");
                    return new ProducerRecord<byte[], byte[]>(topic,jsonObj.toJSONString().getBytes());
                }
            }));
        env.execute();
    }
}
